package kafkahelper

import (
	"context"
	"encoding/json"
	"fmt"
	"github.com/segmentio/kafka-go"
	"strconv"
	"time"
)

type WriterConfig struct {
	Addr  []string
	Topic string
}

type WriterHelper struct {
	w *kafka.Writer
}

//实例化写入对象
func NewWriterHelper(config WriterConfig) *WriterHelper {
	var write = &WriterHelper{}
	fmt.Println("kafka topic:", config.Topic)
	w := &kafka.Writer{
		Addr: kafka.TCP(config.Addr...),
		// NOTE: When Topic is not defined here, each Message must define it instead.
		Balancer: &kafka.LeastBytes{},
		Topic:    config.Topic,
	}
	write.w = w
	return write
}

//写消息
func (w *WriterHelper) writeMessages(ctx context.Context, msgs ...kafka.Message) error {
	return w.w.WriteMessages(ctx, msgs...)
}

//推到kafka去
func (w *WriterHelper) Push(data interface{}) error {
	dataByte, err := json.Marshal(data)
	if err != nil {
		return err
	}
	var message = kafka.Message{
		//Key:   []byte(time.Now().Format(time.RFC3339)),
		Key:   []byte(strconv.FormatInt(time.Now().UnixNano(), 10)),
		Value: dataByte,
	}
	if err = w.writeMessages(context.Background(), message); err != nil {
		return err
	}
	return nil
}

//关闭
func (w *WriterHelper) Close() error {
	return w.Close()
}
